package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/11/20
 */
public class Demo5_ReadJDBC
{
    public static void main(String[] args) {

        //读取外部系统直接获取一个Table，无需流的环境。直接定义表的环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //定义建表语句
        // flink中的数据类型:  https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/types/
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'jdbc',   " +
            "  'url' = 'jdbc:mysql://hadoop102:3306/Mybatis?useSSL=false&useUnicode=true&characterEncoding=UTF-8', " +
            "  'table-name' = 'ws'   ," +
            "  'driver' = 'com.mysql.cj.jdbc.Driver'   ," +
            "  'username' = 'root'   ," +
            "  'password' = '000000'   " +
            ")";

        //insert，create语句，需要使用executeSql执行
        tableEnv.executeSql(createTableSql);

        //执行查询
        tableEnv.sqlQuery("select * from t1")
            .execute()
            .print();


    }
}
